/****************************************************************************
 * Copyright (c) 2009 Remy Chi Jian Suen and others.
 *
 * This program and the accompanying materials are made
 * available under the terms of the Eclipse Public License 2.0
 * which is available at https://www.eclipse.org/legal/epl-2.0/
 *
 * Contributors:
 *     Remy Chi Jian Suen - initial API and implementation
 *
 * SPDX-License-Identifier: EPL-2.0
 *****************************************************************************/
package org.eclipse.ecf.provider.datashare.nio;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.eclipse.core.runtime.Assert;
import org.eclipse.core.runtime.ISafeRunnable;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.ListenerList;
import org.eclipse.core.runtime.SafeRunner;
import org.eclipse.core.runtime.Status;
import org.eclipse.ecf.core.IContainer;
import org.eclipse.ecf.core.IContainerListener;
import org.eclipse.ecf.core.events.IContainerConnectedEvent;
import org.eclipse.ecf.core.events.IContainerDisconnectedEvent;
import org.eclipse.ecf.core.events.IContainerDisposeEvent;
import org.eclipse.ecf.core.events.IContainerEvent;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.datashare.IChannel;
import org.eclipse.ecf.datashare.IChannelConfig;
import org.eclipse.ecf.datashare.IChannelContainerAdapter;
import org.eclipse.ecf.datashare.IChannelContainerListener;
import org.eclipse.ecf.datashare.IChannelListener;
import org.eclipse.ecf.datashare.events.IChannelContainerChannelActivatedEvent;
import org.eclipse.ecf.datashare.events.IChannelContainerChannelDeactivatedEvent;
import org.eclipse.ecf.datashare.events.IChannelContainerEvent;

/**
 * A datashare channel container implementation that creates channels that uses
 * NIO for sending and receiving messages.
 * <p>
 * The channel container facilitates communication at a socket-level to a
 * corresponding {@link NIOChannel}. When a request has been received from a
 * remote peer to establish a channel connection, the request can be honoured
 * via calling the {@link #enqueue(SocketAddress)} method with the remote peer's
 * corresponding socket address as the parameter.
 * </p>
 * <p>
 * Subclasses must implement the following:
 * <ul>
 * <li>For channel creation:
 * <ul>
 * <li>{@link #createNIOChannel(ID, IChannelListener, Map)}</li>
 * </ul>
 * <ul>
 * <li>{@link #createNIOChannel(IChannelConfig)}</li>
 * </ul>
 * </li>
 * <li>To facilitate the logging of statuses:
 * <ul>
 * <li>{@link #log(IStatus)}</li>
 * </ul>
 * </li>
 * </ul>
 * </p>
 * <p>
 * <b>Note:</b> This class/interface is part of an interim API that is still
 * under development and expected to change significantly before reaching
 * stability. It is being made available at this early stage to solicit feedback
 * from pioneering adopters on the understanding that any code that uses this
 * API will almost certainly be broken (repeatedly) as the API evolves.
 * </p>
 */
public abstract class NIODatashareContainer implements IChannelContainerAdapter {

	/**
	 * A thread for establishing a connection to remote clients.
	 */
	private Thread connectionThread;

	/**
	 * A list of IP addresses that should be connected to.
	 */
	private LinkedList pendingConnections;

	/**
	 * A list of socket channels that needs to be processed for handshaking with
	 * the remote peer.
	 */
	private List pendingSockets;

	/**
	 * A map of datashare channels owned by this container mapped by their ids.
	 */
	private Map channels;

	/**
	 * The parent container of this datashare container.
	 */
	private IContainer container;

	/**
	 * List of IChannelContainerListeners attached to this datashare container.
	 */
	private ListenerList listenerList;

	/**
	 * Instantiates a new datashare container that will connect to remote
	 * clients using NIO functionality.
	 * 
	 * @param container
	 *            the parent container of this datashare container, must not be
	 *            <code>null</code>
	 */
	public NIODatashareContainer(IContainer container) {
		Assert.isNotNull(container, "Container cannot be null"); //$NON-NLS-1$
		this.container = container;

		container.addListener(new IContainerListener() {
			public void handleEvent(IContainerEvent event) {
				if (event instanceof IContainerConnectedEvent) {
					ID id = ((IContainerConnectedEvent) event).getTargetID();
					fireChannelConnectedEvent(id);
				} else if (event instanceof IContainerDisconnectedEvent) {
					ID id = ((IContainerDisconnectedEvent) event).getTargetID();
					fireChannelDisconnectedEvent(id);

					disconnect();
				} else if (event instanceof IContainerDisposeEvent) {
					// also invoke disconnect() here in case a disconnection
					// event was never fired
					disconnect();
				}
			}
		});

		channels = new HashMap();
		pendingConnections = new LinkedList();
		pendingSockets = new ArrayList();
		listenerList = new ListenerList();
	}

	/**
	 * Fires a channel connected event to all of this channel container's
	 * channels notifying that the parent container has connected to the
	 * specified target id.
	 * 
	 * @param containerTargetId
	 *            the target id that the parent container has connected to
	 */
	private void fireChannelConnectedEvent(ID containerTargetId) {
		synchronized (channels) {
			for (Iterator it = channels.values().iterator(); it.hasNext();) {
				NIOChannel channel = (NIOChannel) it.next();
				channel.fireChannelConnectEvent(containerTargetId);
			}
		}
	}

	/**
	 * Fires a channel disconnected event to all of this channel container's
	 * channels notifying that the parent container has disconnected from the
	 * specified target id.
	 * 
	 * @param containerTargetId
	 *            the target id that the parent container has disconnected from
	 */
	private void fireChannelDisconnectedEvent(ID id) {
		synchronized (channels) {
			for (Iterator it = channels.values().iterator(); it.hasNext();) {
				NIOChannel channel = (NIOChannel) it.next();
				channel.fireChannelDisconnectEvent(id);
			}
		}
	}

	/**
	 * Notifies the specified listener about the event.
	 * 
	 * @param listener
	 *            the listener to notify
	 * @param event
	 *            the event to notify the listener of
	 */
	private void fireChannelContainerEvent(
			final IChannelContainerListener listener,
			final IChannelContainerEvent event) {
		// use a SafeRunner to send out the notification to ensure that
		// client-side failures do not cause the channel to die
		SafeRunner.run(new ISafeRunnable() {
			public void run() throws Exception {
				listener.handleChannelContainerEvent(event);
			}

			public void handleException(Throwable t) {
				log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
						"Error handling channel container event", t)); //$NON-NLS-1$
			}
		});
	}

	private void fireChannelContainerActivatedEvent(final ID channelId) {
		Object[] listeners = listenerList.getListeners();
		for (int i = 0; i < listeners.length; i++) {
			IChannelContainerListener listener = (IChannelContainerListener) listeners[i];
			fireChannelContainerEvent(listener,
					new IChannelContainerChannelActivatedEvent() {
						public ID getChannelID() {
							return channelId;
						}

						public ID getChannelContainerID() {
							return container.getID();
						}

						public String toString() {
							StringBuffer buffer = new StringBuffer();
							buffer
									.append("IChannelContainerChannelActivatedEvent["); //$NON-NLS-1$
							buffer.append("container=").append( //$NON-NLS-1$
									container.getID());
							buffer.append(",channel=").append(channelId); //$NON-NLS-1$
							buffer.append(']');
							return buffer.toString();
						}
					});
		}
	}

	void fireChannelContainerDeactivatedEvent(final ID channelId) {
		Object[] listeners = listenerList.getListeners();
		for (int i = 0; i < listeners.length; i++) {
			IChannelContainerListener listener = (IChannelContainerListener) listeners[i];
			fireChannelContainerEvent(listener,
					new IChannelContainerChannelDeactivatedEvent() {
						public ID getChannelID() {
							return channelId;
						}

						public ID getChannelContainerID() {
							return container.getID();
						}

						public String toString() {
							StringBuffer buffer = new StringBuffer();
							buffer
									.append("IChannelContainerChannelDeactivatedEvent["); //$NON-NLS-1$
							buffer.append("container=").append( //$NON-NLS-1$
									container.getID());
							buffer.append(",channel=").append(channelId); //$NON-NLS-1$
							buffer.append(']');
							return buffer.toString();
						}
					});
		}
	}

	protected abstract void log(IStatus status);

	/**
	 * Stores the specified channel based on the given ID into this container.
	 * 
	 * @param channel
	 *            the channel to store
	 */
	private void storeChannel(IChannel channel) {
		channels.put(channel.getID(), channel);
	}

	private void disconnect() {
		if (connectionThread != null) {
			connectionThread.interrupt();
			connectionThread = null;
		}
		
		synchronized (pendingConnections) {
			pendingConnections.clear();
		}
		
		synchronized (pendingSockets) {
			for (int i = 0; i < pendingSockets.size(); i++) {
				SocketChannel channel = (SocketChannel) pendingSockets.get(i);
				Util.closeChannel(channel);
			}
			
			pendingSockets.clear();
		}

		synchronized (channels) {
			for (Iterator it = channels.values().iterator(); it.hasNext();) {
				final IChannel channel = (IChannel) it.next();
				// dispose the channel in a SafeRunner so exceptions don't
				// prevent us from disposing other channels
				SafeRunner.run(new ISafeRunnable() {
					public void run() throws Exception {
						channel.dispose();
					}

					public void handleException(Throwable t) {
						log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
								"Error disposing channel: " + channel, t)); //$NON-NLS-1$
					}
				});
			}

			channels.clear();
		}
	}

	/**
	 * Attempts to connect to a remote address that has been enqueued to this
	 * channel container for processing via the {@link #enqueue(SocketAddress)}
	 * method.
	 * 
	 * @param buffer
	 *            the buffer to use for reading and writing data
	 * @throws IOException
	 *             if an IO error occurs while attempting to contact the peer
	 */
	private void connect(ByteBuffer buffer) throws IOException {
		while (!pendingConnections.isEmpty()) {
			// retrieve an IP address to connect to
			SocketAddress remote = (SocketAddress) pendingConnections
					.removeFirst();

			// open a socket channel to the remote address
			SocketChannel socketChannel = SocketChannel.open(remote);

			byte[] bytes = Util.serialize(container.getConnectedID());
			if (bytes == null) {
				// serialization failed, close the socket
				Util.closeChannel(socketChannel);
				return;
			}

			socketChannel.configureBlocking(false);
			Util.write(socketChannel, buffer, bytes);

			pendingSockets.add(socketChannel);
		}
	}

	/**
	 * Enqueues the specified address to be connected to. This should be invoked
	 * after a request has been received from a remote user.
	 * 
	 * @param address
	 *            the address to connect to, cannot be <code>null</code>
	 * @see NIOChannel#sendRequest(ID)
	 */
	public void enqueue(SocketAddress address) {
		Assert.isNotNull(address, "Socket address cannot be null"); //$NON-NLS-1$

		if (connectionThread == null) {
			connectionThread = new Thread(new ConnectionRunnable(), getClass()
					.getName()
					+ "Thread-" + container.getID().toString()); //$NON-NLS-1$
			connectionThread.start();
		}

		pendingConnections.add(address);
	}

	/**
	 * Performs a handshake operation with the remote peer.
	 * 
	 * @param socketChannel
	 *            the socket channel to handshake with
	 * @param data
	 *            the data sent from the channel thus far
	 * @throws ClassNotFoundException
	 *             if a deserialization error occurs
	 * @throws IOException
	 *             if an IO error occurs while reading or writing data
	 */
	private void handshake(SocketChannel socketChannel, ChannelData data)
			throws ClassNotFoundException, IOException {
		// retrieve the data that was sent
		byte[] message = data.getData();

		// read in the response
		ByteArrayInputStream bais = new ByteArrayInputStream(message);
		ObjectInputStream ois = new ObjectInputStream(bais);

		// first response should be the channel id
		ID channelId = (ID) ois.readObject();

		synchronized (channels) {
			// retrieve the channel that corresponds to that id
			IChannel channel = getChannel(channelId);
			if (channel == null) {
				// can't find a channel that corresponds to the id, close the
				// socket
				Util.closeChannel(socketChannel);
			} else {
				// open another ObjectInputStream because each object is
				// serialized separately
				ois = new ObjectInputStream(bais);

				// next id is the id of the remote user
				ID peerId = (ID) ois.readObject();

				// store the peer id and the corresponding socket in the
				// retrieved NIO channel
				NIOChannel datashare = (NIOChannel) channel;
				datashare.put(peerId, socketChannel);

				// check if we have any bytes left to read
				int available = bais.available();
				if (available != 0) {
					// if there are extra bytes that means this is data that
					// the sender has sent to us, we must process these messages
					byte[] received = new byte[available];
					// copy the remaining information
					System.arraycopy(message, message.length - available,
							received, 0, available);
					// process the received data
					datashare.processIncomingMessage(socketChannel, received);
				}
			}
		}
	}

	/**
	 * Processes any pending sockets that are currently queued up in this
	 * channel container and is waiting to initiate the handshake process with
	 * the remote peer.
	 * 
	 * @param buffer
	 *            the buffer to use for reading and writing data
	 * @throws ClassNotFoundException
	 *             if a serialization or deserialization operation encountered
	 *             errors
	 * @throws IOException
	 *             if an IO error occurs while reading or writing data
	 */
	private void processPendingSockets(ByteBuffer buffer)
			throws ClassNotFoundException, IOException {
		for (int i = 0; i < pendingSockets.size(); i++) {
			SocketChannel socketChannel = (SocketChannel) pendingSockets.get(i);

			buffer.clear();
			// read in the response
			ChannelData data = Util.read(socketChannel, buffer);
			if (!data.isOpen()) {
				// the channel isn't open, we should close it on our end also
				Util.closeChannel(socketChannel);
				// remove this channel
				pendingSockets.remove(i);
				i--;
			} else if (data.getData() != null) {
				try {
					handshake(socketChannel, data);
				} finally {
					// this socket has been processed, remove it
					pendingSockets.remove(i);
					i--;
				}
			}
		}
	}

	/**
	 * Creates a new NIO-capable channel within this container.
	 * 
	 * @param channelId
	 *            the ID of the channel, must not be <code>null</code>
	 * @param listener
	 *            the listener for receiving notifications pertaining to the
	 *            created channel, may be <code>null</code> if no listener needs
	 *            to be notified
	 * @param properties
	 *            a map of properties to provide to this channel, may be
	 *            <code>null</code>
	 * @return the created NIOChannel instance
	 * @throws ECFException
	 *             if an error occurred while creating the channel
	 * @see #createChannel(ID, IChannelListener, Map)
	 */
	protected abstract NIOChannel createNIOChannel(ID channelId,
			IChannelListener listener, Map properties) throws ECFException;

	/**
	 * Creates a new NIO-capable channel within this container.
	 * 
	 * @param newChannelConfig
	 *            the configuration for the newly created channel, must not be
	 *            <code>null</code>
	 * @return the created NIOChannel instance
	 * @throws ECFException
	 *             if an error occurred while creating the channel
	 * @see #createChannel(IChannelConfig)
	 */
	protected abstract NIOChannel createNIOChannel(
			IChannelConfig newChannelConfig) throws ECFException;

	public final IChannel createChannel(ID channelId,
			IChannelListener listener, Map properties) throws ECFException {
		Assert.isNotNull(channelId, "Channel id cannot be null"); //$NON-NLS-1$
		IChannel channel = createNIOChannel(channelId, listener, properties);
		if (channel != null) {
			storeChannel(channel);

			fireChannelContainerActivatedEvent(channelId);
		}
		return channel;
	}

	public final IChannel createChannel(IChannelConfig newChannelConfig)
			throws ECFException {
		Assert.isNotNull(newChannelConfig, "Channel config cannot be null"); //$NON-NLS-1$
		Assert.isNotNull(newChannelConfig.getID(),
				"Channel config id cannot be null"); //$NON-NLS-1$

		IChannel channel = createNIOChannel(newChannelConfig);
		if (channel != null) {
			storeChannel(channel);

			fireChannelContainerActivatedEvent(newChannelConfig.getID());
		}
		return channel;
	}

	public void addListener(IChannelContainerListener listener) {
		listenerList.add(listener);
	}

	public IChannel getChannel(ID channelId) {
		Assert.isNotNull(channelId, "Channel id cannot be null"); //$NON-NLS-1$
		return (IChannel) channels.get(channelId);
	}

	public boolean removeChannel(ID channelId) {
		IChannel channel = (IChannel) channels.remove(channelId);
		if (channel == null) {
			return false;
		} else {
			channel.dispose();
			return true;
		}
	}

	public void removeListener(IChannelContainerListener listener) {
		listenerList.remove(listener);
	}

	public Object getAdapter(Class adapter) {
		if (adapter == null) {
			return null;
		} else if (adapter.isInstance(this)) {
			return this;
		} else if (adapter == IContainer.class) {
			return container;
		} else {
			return null;
		}
	}
	
	public String toString() {
		StringBuffer buffer = new StringBuffer(getClass().getName());
		buffer.append("[parentContainer=").append(container).append(']'); //$NON-NLS-1$
		return buffer.toString();
	}

	private class ConnectionRunnable implements Runnable {

		public void run() {
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			while (true) {
				try {
					buffer.clear();

					if (Thread.currentThread().isInterrupted()) {
						return;
					}

					synchronized (pendingConnections) {
						if (!pendingConnections.isEmpty()) {
							connect(buffer);
						}	
					}

					processPendingSockets(buffer);

					Thread.sleep(50);
				} catch (InterruptedException e) {
					Thread.interrupted();
					return;
				} catch (ClassNotFoundException e) {
					log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
							"Could not deserialize", e)); //$NON-NLS-1$
				} catch (IOException e) {
					log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
							"An IO error occurred", e)); //$NON-NLS-1$
				} catch (RuntimeException e) {
					log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
							"A runtime error occurred", e)); //$NON-NLS-1$
				}
			}
		}
	}

}
